Skip to main content

When to use this

  • Text-only chatbot (no voice)
  • Support widget (Intercom, Zendesk, custom)
  • WhatsApp or SMS webhook
  • Any HTTP-based channel

Single-user (stateless)

For simple cases where one machine handles one conversation at a time:
from fastapi import FastAPI
from superdialog import DialogMachine, Flow

app = FastAPI()
dialog_machine = DialogMachine(flow=Flow.load("kyc.json"), llm="openai/gpt-5.1")

@app.post("/turn")
async def turn(payload: dict):
    reply = await dialog_machine.turn(payload["text"])
    return {"reply": reply.text}

Multi-user (SessionWorker)

For multi-user or multi-worker deployments, route each conversation through a SessionWorker:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from superdialog import DialogMachine, Flow, SessionWorker, InMemorySessionStore

flow = Flow.load("kyc.json")
worker: SessionWorker

@asynccontextmanager
async def lifespan(app: FastAPI):
    global worker
    worker = SessionWorker(
        agent_factory=lambda: DialogMachine(flow=flow, llm="openai/gpt-5.1"),
        store=InMemorySessionStore(),    # swap for RedisSessionStore in production (v0.3)
    )
    yield

app = FastAPI(lifespan=lifespan)

@app.post("/turn")
async def turn(payload: dict):
    async with worker.acquire(payload["session_id"]) as h:
        result = await h.turn(payload["text"])
    return {"reply": result.text}
The SessionWorker:
  • Creates one DialogMachine per active session
  • Shares the immutable Flow by reference
  • Serialises concurrent requests for the same session_id
  • Runs concurrent requests for different session IDs fully in parallel

Streaming endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from superdialog import DialogMachine, Flow

app = FastAPI()
dialog_machine = DialogMachine(flow=Flow.load("kyc.json"), llm="openai/gpt-5.1")

@app.post("/stream")
async def stream(payload: dict):
    async def generate():
        stream = await dialog_machine.turn(payload["text"], stream=True)
        async for chunk in stream:
            yield chunk.text

    return StreamingResponse(generate(), media_type="text/plain")

Using FastAPIRouter

The built-in adapter mounts /turn, /stream, and /reset in one line:
from fastapi import FastAPI
from superdialog import DialogMachine, Flow
from superdialog.adapters.fastapi import FastAPIRouter

dialog_machine = DialogMachine(flow=Flow.load("kyc.json"), llm="openai/gpt-5.1")

app = FastAPI()
app.include_router(FastAPIRouter(dialog_machine), prefix="/dialog")
# Exposes: POST /dialog/turn, POST /dialog/stream, POST /dialog/reset

Request / response shape

// POST /turn
{ "text": "Hello, I need to verify my KYC.", "session_id": "user-42" }

// Response
{ "reply": "Sure! Could you please provide the last 4 digits of your Aadhaar?" }

Deploying to production

For production multi-worker FastAPI:
  1. Replace InMemorySessionStore with RedisSessionStore (v0.3) so state survives across workers
  2. Set max_sessions on SessionWorker to cap memory usage
  3. Use NullSessionStore if your sessions are fully stateless (e.g. webhook-per-message pattern)